home *** CD-ROM | disk | FTP | other *** search
-
- #include <stdio.h>
- #include <stdlib.h>
- #include <time.h>
-
- #include <exec/exec.h>
- #include <proto/exec.h>
- #include <proto/dos.h>
-
- #include "queue.h"
-
- #include "queue_library.h" /* so we can look at internals */
-
- #if 1
- #define LISTEN_REOPEN
- #define SEND_REOPEN
- #endif
-
- #if defined (__GNUC__)
- #include "queue_inline.h"
- #elif defined (__SASC)
- #include "queue_pragmas.h"
- #endif
-
- #if !defined (__SASC)
- struct ExecBase *SysBase = NULL;
- struct DosLibrary *DOSBase = NULL;
- #endif
- struct Library *QueueBase = NULL;
-
- ULONG sigbit = -1;
- QMessage *Qmsg = NULL;
-
- #define QMSG_DATA_SIZE 1
- #define PID(msg) (*(ULONG *)(msg->qm_Data))
-
- void
- Block (int *abort)
- {
- int r;
-
- if (r = rand () % 3)
- Delay (r * 5);
- if (SetSignal (0, SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C)
- *abort = 1;
- }
-
- void
- cleanup (void)
- {
- if (sigbit != -1)
- {
- FreeSignal (sigbit);
- }
- #if !defined (__SASC)
- if (DOSBase)
- {
- CloseLibrary ((struct Library *) DOSBase);
- }
- #endif
- if (QueueBase)
- {
- if (Qmsg)
- QFreeMsg (Qmsg, QMSG_DATA_SIZE);
- CloseLibrary (QueueBase);
- }
- }
-
- void
- PrintQueue (QHandle *qhandle)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QueueHandle *qh = (QueueHandle *) qhandle;
- char s[512], *ptr;
- QMessage *msg, *next, *err_msg = NULL;
- int count = 0, replies, qnread;
-
- // ObtainSemaphore (qn -> qn_Semaphore);
- Forbid ();
- ptr = s;
- msg = (QMessage *) qn -> qn_List.lh_Head;
- while (next = (QMessage *) msg -> qm_MinNode.mln_Succ)
- {
- switch (msg -> qm_Status)
- {
- case QMS_ACTIVE:
- sprintf (ptr, "[%da:%d],", PID (msg), msg -> qm_Replies); break;
- case QMS_REMOVED:
- sprintf (ptr, "[%dr:%d],", PID (msg), msg -> qm_Replies); break;
- case QMS_MARKER:
- sprintf (ptr, "[m],"); break;
- default:
- sprintf (ptr, "[** %x **],", msg -> qm_Status); break;
- }
- if (msg -> qm_Status == QMS_ACTIVE ||
- msg -> qm_Status == QMS_REMOVED)
- {
- if (msg -> qm_Replies > qn -> qn_Read)
- err_msg = msg;
- }
- ptr = s + strlen (s);
- msg = next;
- if (count ++ > 42)
- break;
- }
- if (qh -> qh_Mode == QMODE_LISTEN)
- {
- if (msg = qh -> qh_un.qhl.qhl_Message)
- sprintf (ptr, " current = %d.\n", PID (msg));
- else
- sprintf (ptr, " current = <null>.\n");
- }
- else
- {
- sprintf (ptr, " qn_Read = %d.\n", qn -> qn_Read);
- }
- if (err_msg)
- {
- replies = err_msg -> qm_Replies;
- qnread = qn -> qn_Read;
- }
- Permit ();
- // ReleaseSemaphore (qn -> qn_Semaphore);
- printf (s);
-
- if (err_msg)
- {
- printf ("\nToo many replies: %d (qn_Read = %d).\n", replies, qnread);
- while (1)
- Delay (100);
- }
- }
-
- int
- main (int argc, char *argv[])
- {
- ULONG sigmask;
- QMessage *msg = NULL;
- QHandle qh = NULL;
- char *qn, *queuename = "testqueue";
- int r, abort = 0, reopen = 1;
- ULONG pid = 0;
-
- if (argc == 2)
- {
- pid = ((struct Process *) FindTask (0)) -> pr_TaskNum;
- printf ("Server ( pid = %ld ).\n", pid);
- }
- else
- printf ("Client.\n", pid);
-
- if (qn = getenv ("TESTQUEUE"))
- {
- printf ("Queue = %s.\n", qn);
- queuename = qn;
- }
-
- #if !defined (__SASC)
- SysBase = *(struct ExecBase **)4;
- if (!(DOSBase = (struct DosLibrary *) OpenLibrary ("dos.library", 37)))
- return 20;
- #endif
-
- atexit (cleanup);
-
- if ((sigbit = AllocSignal (-1)) == -1)
- {
- printf ("Failed to allocate signal.\n");
- return 20;
- }
- sigmask = 1 << sigbit;
-
- if (!(QueueBase = OpenLibrary ("queue.library", 0)))
- {
- printf ("Failed to open queue.library.\n");
- return 20;
- }
- if (!(Qmsg = QAllocMsg ( QMSG_DATA_SIZE )))
- {
- printf ("Couldn't allocate message.\n");
- return 20;
- }
- if (!pid) /* Client */
- {
- r = time (NULL);
- srand (r);
-
- while (1)
- {
- if (!abort && !reopen)
- {
- printf ("Waiting ...\n");
- abort = Wait (sigmask | SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C;
- printf ("Signal.\n");
- }
- if (abort || reopen)
- {
- if (qh)
- {
- Forbid ();
- while (msg = QGetMsg (qh));
- QClose (qh);
- Permit ();
- if (abort)
- {
- printf ("Exiting.\n");
- return 0;
- }
- }
- }
- if (reopen)
- {
- Delay (50);
-
- if (!(qh = QOpen (queuename, QMODE_LISTEN, sigbit)))
- {
- printf ("Failed to open \"%s\" queue.\n", queuename);
- return 20;
- }
- printf ("Queue \"%s\" opened.\n", queuename);
- reopen = 0;
- }
-
- PrintQueue (qh);
-
- if (rand () > RAND_MAX/2) /*** Process all messages ***/
- {
- r = 0;
-
- while (msg = QGetMsg (qh))
- {
- printf ("Got message from %d.\n", PID (msg));
- r ++;
- }
- if (!r)
- printf ("Got no message(s).\n");
- }
- else /*** Reply one message & block ***/
- {
- if (msg = QGetMsg (qh))
- printf ("Got message from %d (reply, block).\n", PID (msg));
- else
- printf ("Got no message (reply, block).\n");
-
- if (rand () > RAND_MAX/2)
- {
- QReplyMsg (qh);
- Block (&abort);
- }
- else
- {
- Block (&abort);
- QReplyMsg (qh);
- }
- }
- #ifdef LISTEN_REOPEN
- if (rand () < RAND_MAX/10)
- reopen = 1;
- #endif
- }
- }
- else
- {
- PID (Qmsg) = pid;
-
- while (1)
- {
- if (!abort && !reopen)
- {
- printf ("Waiting ...\n");
- abort = Wait (sigmask | SIGBREAKF_CTRL_C) & SIGBREAKF_CTRL_C;
- printf ("Signal.\n");
- }
- if (abort || reopen)
- {
- if (qh)
- {
- while ((r = QClose (qh)) > 0)
- {
- printf ("Cannot exit: %d message(s) left in queue.\n", r);
- PrintQueue (qh);
- Delay (10);
- QGetMsg (qh);
- }
- if (abort)
- {
- printf ("All done.\n");
- return 0;
- }
- }
- }
- if (reopen)
- {
- if (!(qh = QOpen (queuename, QMODE_SEND, sigbit)))
- {
- printf ("Failed to open \"%s\" queue.\n", queuename);
- return 20;
- }
- printf ("Queue \"%s\" opened.\n", queuename);
- msg = Qmsg;
- reopen = 0;
- }
- else
- {
- msg = QGetMsg (qh);
- if (msg == Qmsg)
- printf ("Got my message (%d).\n", PID (msg));
- else if (msg)
- printf ("Got wrong message (%d).\n", PID (msg));
- else
- printf ("Got no message.\n");
-
- Block (&abort);
- }
- if (!abort)
- {
- #ifdef SEND_REOPEN /* ERROR */
- if (rand () < RAND_MAX/10)
- reopen = 1;
- #endif
- if (msg == Qmsg)
- {
- printf ("Sending message (%d): ", PID (msg));
- QAddMsg (qh, Qmsg);
- PrintQueue (qh);
- }
- }
- }
- }
- }
-